﻿using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data.Common;
using System.IO;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using System.Transactions;

using iTool.Cloud.Center.ServiceProvider.StorageProvider;

using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;

using Newtonsoft.Json;

using Orleans;

namespace iTool.Cloud.Center.ServiceProvider
{
    public class ClusterLoggerService : Orleans.Grain, IClusterLoggerService
    {

        const string DirectoryPath = "./Logger";
        const string DataSource = DirectoryPath + "/{0}.logs";
        string connectionString = string.Empty, tableName;
        private Subject<Logger> _subject;

        public ClusterLoggerService()
        {
            if (!Directory.Exists(DirectoryPath))
                Directory.CreateDirectory(DirectoryPath);

            _subject = new Subject<Logger>();
            _subject.Buffer(TimeSpan.FromSeconds(1), 4000)
                .Where(x => x.Count > 0)
                .Select(list => Observable.FromAsync(() => BatchWriteAsync(list)))
                .Concat()
                .Subscribe();
        }
        public async override Task OnActivateAsync()
        {
            this.tableName = this.GetPrimaryKeyString();
            this.connectionString = new SqliteConnectionStringBuilder
            {
                DataSource = String.Format(DataSource, tableName),
                Mode = SqliteOpenMode.ReadWriteCreate,
                Cache = SqliteCacheMode.Shared,
                Pooling = true
            }.ToString();

            string[] sqls = new string[] { 
                $"CREATE TABLE IF NOT EXISTS LogTrace(typeName varchar, message varchar, trackId long, createDate any)",
                $"CREATE TABLE IF NOT EXISTS LogDebug(typeName varchar, message varchar, trackId long, createDate any)",
                $"CREATE TABLE IF NOT EXISTS LogInformation(typeName varchar, message varchar, trackId long, createDate any)",
                $"CREATE TABLE IF NOT EXISTS LogWarning(typeName varchar, message varchar, trackId long, createDate any)",
                $"CREATE TABLE IF NOT EXISTS LogError(typeName varchar, message varchar, trackId long, createDate any)",
                $"CREATE TABLE IF NOT EXISTS LogCritical(typeName varchar, message varchar, trackId long, createDate any)",
                $"CREATE TABLE IF NOT EXISTS LogNone(typeName varchar, message varchar, trackId long, createDate any)",
            };

            using (var connection = new SqliteConnection(this.connectionString))
            {
                connection.Open();
                foreach (var sql in sqls)
                {
                    using (SqliteCommand command = new SqliteCommand(sql, connection))
                    {
                        command.ExecuteNonQuery();
                    }
                }
            }

            await base.OnActivateAsync();
        }

        public Task LogCriticalAsync(string typeName, string message, long trackId)
        {
            this._subject.OnNext(new Logger 
            {
                TrackId = trackId,
                LogLevel = LogLevel.Critical,
                TypeName = typeName,
                Message = message,
                CreateDate = DateTime.Now
            });

            return Task.CompletedTask;
        }

        public Task LogDebugAsync(string typeName, string message, long trackId)
        {
            this._subject.OnNext(new Logger
            {
                TrackId = trackId,
                LogLevel = LogLevel.Debug,
                TypeName = typeName,
                Message = message,
                CreateDate = DateTime.Now
            });

            return Task.CompletedTask;
        }

        public Task LogErrorAsync(string typeName, string message, long trackId)
        {
            this._subject.OnNext(new Logger
            {
                TrackId = trackId,
                LogLevel = LogLevel.Error,
                TypeName = typeName,
                Message = message,
                CreateDate = DateTime.Now
            });

            return Task.CompletedTask;
        }

        public Task LogInformationAsync(string typeName, string message, long trackId)
        {
            this._subject.OnNext(new Logger
            {
                TrackId = trackId,
                LogLevel = LogLevel.Information,
                TypeName = typeName,
                Message = message,
                CreateDate = DateTime.Now
            });

            return Task.CompletedTask;
        }

        public Task LogNoneAsync(string typeName, string message, long trackId)
        {
            this._subject.OnNext(new Logger
            {
                TrackId = trackId,
                LogLevel = LogLevel.None,
                TypeName = typeName,
                Message = message,
                CreateDate = DateTime.Now
            });

            return Task.CompletedTask;
        }

        public Task LogTraceAsync(string typeName, string message, long trackId)
        {
            this._subject.OnNext(new Logger
            {
                TrackId = trackId,
                LogLevel = LogLevel.Trace,
                TypeName = typeName,
                Message = message,
                CreateDate = DateTime.Now
            });

            return Task.CompletedTask;
        }

        public Task LogWarningAsync(string typeName, string message, long trackId)
        {
            this._subject.OnNext(new Logger
            {
                TrackId = trackId,
                LogLevel = LogLevel.Warning,
                TypeName = typeName,
                Message = message,
                CreateDate = DateTime.Now
            });

            return Task.CompletedTask;
        }

        private async Task BatchWriteAsync(IList<Logger> loggers)
        {
            using (var connection = new SqliteConnection(connectionString))
            {
                connection.Open();

                var transaction = connection.BeginTransaction(System.Data.IsolationLevel.ReadUncommitted);

                try
                {
                    foreach (var item in loggers)
                    {
                        using (SqliteCommand sqliteCommand = new SqliteCommand(item.GetInsertSql(), connection, transaction))
                        {
                            item.SetSqlParameter(sqliteCommand);
                            sqliteCommand.ExecuteNonQuery();
                        }
                    }
                    transaction.Commit();
                }
                catch (Exception ex)
                {
                    this._subject.OnNext(new Logger
                    {
                        TrackId = 0,
                        LogLevel = LogLevel.Error,
                        TypeName = "this",
                        Message = ex.Message,
                        CreateDate = DateTime.Now
                    });
                    transaction.Rollback();
                }

                await connection.CloseAsync();
            }
        }

        public async Task<string> QueryLoggerAsync(string query)
        {
            using (var connection = new SqliteConnection(connectionString))
            {
                connection.Open();
                using (var command = new SqliteCommand(query, connection))
                {
                    await using (var reader = await command.ExecuteReaderAsync())
                    {
                        var json = this.ToJson(reader);
                        return json;
                    }
                }
            }
        }

        private string ToJson(DbDataReader dataReader)
        {
            Dictionary<int, (Type, string)> keyValues = new Dictionary<int, (Type, string)>();
            for (int i = 0; i < dataReader.FieldCount; i++)
            {
                var field = dataReader.GetName(i);
                Type type = dataReader.GetFieldType(i);
                keyValues.Add(i, (type, field));
            }

            StringBuilder jsonString = new StringBuilder("[");
            while (dataReader.Read())
            {
                jsonString.Append('{');

                for (int i = 0; i < dataReader.FieldCount; i++)
                {
                    jsonString.Append(String.Format("\"{0}\":", keyValues[i].Item2));
                    if (keyValues[i].Item1 == typeof(string))
                        jsonString.Append(String.Format("\"{0}\",", dataReader[i]));
                    else
                        jsonString.Append(String.Format("{0},", dataReader[i]));
                }

                jsonString[^1] = '}';
                jsonString.Append(",");
            }

            if (dataReader.HasRows)
                jsonString[^1] = ']';
            else
                jsonString.Append(']');

            _ = keyValues;
            return jsonString.ToString();
        }

    }
}
